package com.lambkit.core.task.app;

import com.lambkit.core.CronExecutor;
import com.lambkit.core.LambkitResult;
import com.lambkit.core.LifecycleException;
import com.lambkit.core.LifecycleState;
import com.lambkit.core.pipeline.IValve;
import com.lambkit.core.task.ITaskInstance;
import com.lambkit.core.task.IWorkCenter;
import com.lambkit.core.task.WorkCenter;
import com.lambkit.core.task.WorkQueue;
import com.lambkit.util.Printer;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;

/**
 *
 * @author yangyong(孤竹行)
 */
public class AppWorkCenter extends WorkCenter<AppTask, AppTaskInstance> {
    private WorkQueue workQueue = new WorkQueue();
    private ConcurrentHashMap<String, AppTaskInstance> instances = new ConcurrentHashMap<String, AppTaskInstance>();
    private final ThreadPoolExecutor jobExecutor;
    private final CronExecutor cronExecutor;

    private long timeout = 1000 * 60 * 5;//5分钟

    public AppWorkCenter() {
        this(null);
    }

    public AppWorkCenter(ThreadPoolExecutor executor) {
        if(executor !=null) {
            jobExecutor = executor;
        } else {
            ArrayBlockingQueue<Runnable> threadWorkQueue = new ArrayBlockingQueue<Runnable>(10000);
            jobExecutor = new ThreadPoolExecutor(5, 30, 30 * 1000, TimeUnit.MILLISECONDS, threadWorkQueue);
        }
        cronExecutor = new CronExecutor(100, true) {
            @Override
            public void execute() {
                int size = jobExecutor.getQueue().size();
                if(size < 10) {
                    jobExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            AppTaskInstance task = (AppTaskInstance) workQueue.next();
                            if (task != null) {
                                Printer.print(this, "task", "execute task:" + task.getId());
                                try {
                                    task.getContext().setStartTime(System.currentTimeMillis());
                                    instances.put(task.getId(), (AppTaskInstance) task);
                                    task.start();
                                    task.getContext().setCurrentState(LifecycleState.STOPPED);
                                    task.getContext().setStopTime(System.currentTimeMillis());
                                } catch (LifecycleException e) {
                                    throw new RuntimeException(e);
                                }
                            }//if
                        }//run
                    });
                } else {
                    Printer.print(this, "task", "jobExecutor queue size:" + size);
                }//if
                Iterator<Map.Entry<String, AppTaskInstance>> iterator = instances.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, AppTaskInstance> entry = iterator.next();
                    AppTaskInstance taskInstance = entry.getValue();
                    long currentTime = System.currentTimeMillis();
                    if (taskInstance.getContext().getStopTime() + timeout > currentTime) {
                        //失效
                        iterator.remove();
                    }
                }//while
            }//execute
        };
    }

    @Override
    public void start() throws LifecycleException {
        cronExecutor.start();
    }

    @Override
    public void stop() throws LifecycleException {
        super.stop();
        cronExecutor.close();
        jobExecutor.shutdown();
    }

    @Override
    public void execute(ITaskInstance task) {
        workQueue.push(task);
        Printer.print(this, "task", "workQueue size:" + workQueue.size());
    }

    @Override
    public WorkQueue getQueue() {
        return workQueue;
    }

    @Override
    public LambkitResult getResult(String taskId) {
        return instances.get(taskId).getContext().getResult();
    }

    @Override
    public AppTaskContext getContext(String taskId) {
        AppTaskInstance taskInstance = instances.get(taskId);
        if (taskInstance != null) {
            return taskInstance.getContext();
        }
        return null;
    }

    @Override
    public AppTaskInstance poll(String taskId) {
        AppTaskInstance taskInstance = instances.get(taskId);
        if (taskInstance != null) {
            if(taskInstance.getContext().getCurrentState().equals(LifecycleState.STOPPED)) {
                instances.remove(taskId);
                return taskInstance;
            }
        }
        return null;
    }

    @Override
    public IWorkCenter addValve(Class<? extends IValve> clazz) {
        return this;
    }
}
